package com.migrate.module.migrate;

import com.migrate.module.config.ApplicationContextUtil;
import com.migrate.module.domain.*;
import com.migrate.module.enumeration.BinlogType;
import com.migrate.module.enumeration.ConsumerStatus;
import com.migrate.module.enumeration.DBChannel;
import com.migrate.module.enumeration.OperateType;
import com.migrate.module.exception.BusinessException;
import com.migrate.module.mapper.migrate.EtlBinlogConsumeRecordMapper;
import com.migrate.module.service.MigrateConfigService;
import com.migrate.module.service.MigrateService;
import com.migrate.module.util.MigrateCheckUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;

import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * 对数据合并，并写入存储
 *
 * @author zhonghuashishan
 */
@Slf4j
public class MergeBinlogWrite {
    /**
     * 最大发送数
     */
    private static final Integer MAX_SEND = 500;
    /**
     * 比对时间戳
     */
    private static final String CREATE_TIME_STAMP = "create_time";

    private static final String TIME_STAMP = "update_time";
    /**
     * 分隔符
     */
    private static final String SPLIT_KEY = "&";
    /**
     * 存储需要同步的数据对象
     */
    private List<BinLog> binlogDataList;
    /**
     * 用于存储过滤最新的数据
     */
    private final Map<String, BinLog> binlogDataMap = new HashMap<>(2048);


    /**
     * 本次同步的返回对象
     */
    private ScrollInfo scrollInfo = new ScrollInfo();
    /**
     * 存储本次需要更新的消息对象信息
     */
    private List<EtlBinlogConsumeRecord> etlBinlogConsumeRecordList = new ArrayList<>();

    /**
     * 对同步的数据进行合并，转换
     *
     * @param binlogData MySQL的binlog对象
     */
    public void mergeBinlog(BinlogData binlogData) {
        // 我们此时从queue拿到的所有数据，可能 是包含了一条数据的多个binlog
        // 我们在这里先对多个binlog做一个merge合并
        // 把每条数据的binlog先放在一个map里，下一条数据，如果万一说拿到这条数据的binlog

        List<Map<String, Object>> dataList = binlogData.getDataMap();
        if (CollectionUtils.isEmpty(dataList)) {
            return;
        }
        String key = MergeConfig.getSingleKey(binlogData.getTableName());

        for (Map<String, Object> dataMap : dataList) {
            // 每次都需要先加入到集合中，用于处理完后，批量更新
            etlBinlogConsumeRecordList.add(binlogData.getConsumeRecord());

            // 先获取这笔同步记录的唯一标识字段
            // rocketmq里有一个topic -> table -> 标识字段值（订单编号）
            String mergeKey = dataMap.get(key) + SPLIT_KEY + binlogData.getTableName() + SPLIT_KEY + binlogData.getConsumeRecord().getTopic();
            // 验证是否在这批同步的数据当中，有相同的更新记录
            BinLog getBinlogData = binlogDataMap.get(mergeKey);
            if (!ObjectUtils.isEmpty(getBinlogData)) {
                // 判断历史的记录的操作时间 是否大于本次同步的操作时间
                // 上一次放到map里的binlog，他是比较新的，此时这条binlog是旧的，不要去做任何处理
                // insert update1 update2
                // update2先进来map，update1后进来，就不要去管他了
                if (getBinlogData.getOperateTime().compareTo(binlogData.getOperateTime()) > 0) {
                    continue;
                }
            }
            // 将数据转换为单条log对象
            BinLog binLog = buildBinLog(binlogData, dataMap);
            // 所以说在这里的merge，其实就是对一条数据的多个binlog，按照时间先后顺序，去做一个覆盖
            binlogDataMap.put(mergeKey, binLog); // topic->table->数据标识，binlog，map
        }
    }

    /**
     * 全量的数据同步
     */
    public ScrollInfo load(List<Map<String, Object>> scrollList, RangeScroll rangeScroll) {
        // 过滤掉非需要匹配的时间断的数据 如果过滤后数据为空,则直接返回。
        filterCreateTime(scrollList, rangeScroll);
        if (scrollList.size() == 0) {
            return scrollInfo;
        }
        // 数据转换为增量的模型
        transformModel(scrollList, rangeScroll.getTableName());
        // 对数据和新库进行过滤
        filterBinlogAging(OperateType.ALL, rangeScroll.getDomain());
        // 写入新库
        write(OperateType.ALL, rangeScroll);
        return scrollInfo;
    }


    /**
     * 首先拿到当前页最大的滚动ID，然后在过滤掉不是查询的时间区域的数据，防止全部数据都被过滤掉
     *
     * @param scrollList
     * @param rangeScroll
     */
    private void filterCreateTime(List<Map<String, Object>> scrollList, RangeScroll rangeScroll) {
        String key = MergeConfig.getSingleKey(rangeScroll.getTableName());
        scrollInfo.setMaxScrollId(scrollList.get(scrollList.size() - 1).get(key).toString());

        Iterator<Map<String, Object>> iterator = scrollList.iterator();
        while (iterator.hasNext()) {
            Map<String, Object> scrollMap = iterator.next();
            Date createTime = (Date) scrollMap.get(CREATE_TIME_STAMP);
            if (createTime.compareTo(rangeScroll.getStartTime()) < 0 || createTime.compareTo(rangeScroll.getEndTime()) > 0) {
                iterator.remove();
            }
        }
        scrollInfo.setScrollSize(scrollList.size());
    }

    /**
     * 对 合并后的数据进行验证是否为过时数据
     */
    public void filterBinlogAging(OperateType operateType, String domain) {
        // 批量查询数据，是否存在新库，并返回匹配的数据集合
        // 把我们merge完这一批数据，根据key从目标库里查询出来，后续可以用作比对
        Map<String, Map<String, Object>> respMap = batchQuery(operateType, domain);

        // binlogDataMap是我们之前从queue里拿出来的所有的数据，merge
        // respMap，对这批数据从目标库里做一个查询，看看这批数据在目标库里是什么样子的

        // 开始核对数据是否已经存在库中，并验证谁的时间最新过滤失效数据
        for (Map.Entry<String, BinLog> entry : binlogDataMap.entrySet()) {
            BinLog binLog = entry.getValue();
            //当前同步要处理的表名称
            String tableName = binLog.getTableName();
            // 判断同步的数据库中，是否存新库已存在
            // 本次binlog，在目标库里已经存在了一条数据了
            if (!CollectionUtils.isEmpty(respMap) && respMap.containsKey(entry.getKey())) {
                //当前同步的这条记录
                Map<String, Object> binLogMap = binLog.getDataMap();
                // 新库被查询到的记录
                Map<String, Object> targetMap = respMap.get(entry.getKey());
                // 处理同步的记录是否需要执行，如果同步的时间大于新库的时间，则代表需要更新,删除的数据不比对时间
                if (!MigrateCheckUtil.comparison(binLogMap, targetMap, tableName)
                        && !BinlogType.DELETE.getValue().equals(binLog.getOperateType())) {
                    // 第二种情况，如果我当前这条binlog，是针对你的delete删除操作，binlog是一定要处理的
                    continue;
                }

                // 第一种情况，binlog比目标库的数据是要新的，operate time是新的，update更新操作
                binLog.setOperateType(BinlogType.UPDATE.getValue());
            } else {
                // binlog在目标库里不存在，就说明我是要执行插入insert操作
                // 目标库里数据不存在，我的operate type=insert，如果我是update，手工调整为insert
                // 我是delete，直接返回就可以了，目标库里不存在这条数据，此时删除操作就不要去做了

                //数据在新库不存在，对多条数据的最后一条结果集的类型为update，需要更正为insert，如果是delete则略过
                if (BinlogType.UPDATE.getValue().equals(binLog.getOperateType())) {
                    binLog.setOperateType(BinlogType.INSERT.getValue());
                }
                if (BinlogType.DELETE.getValue().equals(binLog.getOperateType())) {
                    continue;
                }
            }

            // 将需要写入的数据添加到集合中
            binlogDataList.add(binLog);
        }

    }

    /**
     * 批量查询已存在新库的数据
     *
     * @return 已存在新库的数据
     */
    private Map<String, Map<String, Object>> batchQuery(OperateType operateType, String domain) {
        // 先获取本次迁移的全部唯一key
        List<String> keyStrList = new ArrayList<>(binlogDataMap.keySet());
        binlogDataList = new ArrayList<>(keyStrList.size());

        // 先对这批数据，按照表名做一个分组
        Map<String, List<String>> keyMap = new HashMap<>();
        //增量处理和全量处理
        if (operateType == OperateType.ADD) {
            // 增量的数据进行分组处理(按表粒度分组)
            // 按表为粒度来进行分组，就是一个表的多条数据分为一组
            keyMap = groupIncreMentTable(keyStrList);
        } else if (operateType == OperateType.ALL) {
            // 全量数据进行分组(只是模型转换的概念)
            keyMap = groupAllTable(keyStrList);
        }

        Map<String, Map<String, Object>> targetMap = new HashMap<>();
        MigrateService migrateService = ApplicationContextUtil.getBean(MigrateService.class);

        List<Map<String, Object>> targetAllList = new ArrayList<>();
        for (Map.Entry<String, List<String>> mapEntry : keyMap.entrySet()) {
            String dataBaseKey = mapEntry.getKey();
            String[] split = dataBaseKey.split(SPLIT_KEY);
            // 获取topic和对应的表
            String tableName = null;
            String topic = null;
            if (operateType == OperateType.ADD){
                topic = split[0];
                tableName = split[1];
            }else{
                tableName = split[0];
            }

            List<String> keyList = mapEntry.getValue();
            //数据切割，每次查询200条数据
            // 表->key list，可能有很多条，条数太多了以后，如果我去做批量的查询，一次查询的量太大了
            // 去对他key list做一个切割，按200条为一个单位，切割成多个批次
            // 分为多个批次，把表对应的所有数据，从目标库表里查询出来，我需要做一些比对，我当亲debinlog，是不是比你的目标库里的数据要旧
            int limit = countStep(keyList.size());
            //切割成多个集合对象
            // java8表达式，流处理的方式，按照我们指定的批次，拆分为了多个批次
            // 语法糖
            List<List<String>> splitList = Stream.iterate(0, n -> n + 1).limit(limit).parallel().map(
                    a -> keyList.stream().skip((long) a * MAX_SEND).limit(MAX_SEND).parallel().collect(Collectors.toList())).collect(Collectors.toList());

            // 获取对应的业务域 滚动对象
            RangeScroll scrollConfig = buildRangeScroll(tableName, topic,domain);
            // 分页查询数据
            for (List<String> strings : splitList) {
                List<Map<String, Object>> targetList = migrateService.findByIdentifiers(scrollConfig, strings, DBChannel.CHANNEL_2.getValue());
                targetAllList.addAll(targetList);
            }

            String keyValue = MergeConfig.getSingleKey(tableName);
            for (Map<String, Object> target : targetAllList) {
                String mapKey = target.get(keyValue) + "";
                targetMap.put(mapKey + SPLIT_KEY + tableName, target);
            }
        }
        return targetMap;
    }

    /**
     * 根据业务系统和topic和表，构造当前所需的对象
     *
     * @param tableName
     * @param topic
     * @param domain
     * @return
     */
    private RangeScroll buildRangeScroll(String tableName, String topic,String domain) {
        MigrateConfigService migrateConfigService = ApplicationContextUtil.getBean(MigrateConfigService.class);
        RangeScroll scrollConfig;
        if (topic != null){
            scrollConfig = migrateConfigService.getScrollConfig(topic);
        }else{
            scrollConfig = new RangeScroll();
            scrollConfig.setDomain(domain);
        }
        scrollConfig.setTableName(tableName);
        scrollConfig.setTargetTableName(migrateConfigService.getSourceTableName(scrollConfig.getDomain(),tableName));
        // 获取该表的唯一关键字段名称
        String singleKey = MergeConfig.getSingleKey(tableName);
        scrollConfig.setScrollName(singleKey);

        return scrollConfig;
    }

    /**
     * 对数据进行写入
     */
    public void write(OperateType operateType, RangeScroll rangeScroll) {
        // 先按表，将数据进行分组
        Map<String, List<BinLog>> binLogMap = binlogDataList.stream().collect(Collectors.groupingBy(BinLog::getTableName));
        boolean isWrite = true;
        // 遍历不同写入表的集合对象
        for (Map.Entry<String, List<BinLog>> mapEntry : binLogMap.entrySet()) {
            String tableName = mapEntry.getKey();
            List<BinLog> binLogList = mapEntry.getValue();
            String topic = binLogList.get(0).getTopic();
            // 全量的是从外部带入的参数，增量的通过表名和topic构建
            if (Objects.isNull(rangeScroll)) {
                rangeScroll = buildRangeScroll(tableName, topic,null);
            }
            MigrateService migrateService = ApplicationContextUtil.getBean(MigrateService.class);
            // 批量写入
            boolean isFlag = migrateService.migrateBat(rangeScroll, binLogList);
            // 有一次更新失败，本批次的offset都不更新状态
            if (!isFlag) {
                isWrite = false;
            }
        }
        // 批量更新offset的标志，如果更新过程中有一个批次是失败的，都不能更新掉本地同步的offset，待下次拉取的时候更新
        if (isWrite) {
            if (OperateType.ADD == operateType) {
                updateConsumeRecordStatus();
            }
        } else {
            //如果有更新失败todo 抛出异常，暂停任务，等排查出问题后继续进行
            throw new BusinessException("全量数据写入失败");
        }
    }

    /**
     * 更新消息队列的offset标志为已完成
     */
    private void updateConsumeRecordStatus() {
        EtlBinlogConsumeRecordMapper etlBinlogConsumeRecordMapper = ApplicationContextUtil.getBean(EtlBinlogConsumeRecordMapper.class);
        Map<Integer, List<EtlBinlogConsumeRecord>> integerListMap = etlBinlogConsumeRecordList.stream().collect(Collectors.groupingBy(EtlBinlogConsumeRecord::getQueueId));
        for (Map.Entry<Integer, List<EtlBinlogConsumeRecord>> mapEntry : integerListMap.entrySet()) {
            Integer queueId = mapEntry.getKey();
            List<EtlBinlogConsumeRecord> etlBinlogConsumeRecordList = mapEntry.getValue();
            // 批量更新
            etlBinlogConsumeRecordMapper.batchUpdateConsumeRecordStatus(queueId, ConsumerStatus.CONSUME_SUCCESS.getValue(), etlBinlogConsumeRecordList);
        }
    }

    /**
     * 增量数据进行分组
     * @return
     */
    private  Map<String, List<String>> groupIncreMentTable(List<String> keyStrList){
        Map<String, List<String>> keyMap = new HashMap<>();
        //筛选按表为维度的集合
        for (String keyStr : keyStrList) {
            String[] split = keyStr.split(SPLIT_KEY);
            List<String> keyList;
            String key = split[0];
            String tableName = split[1];
            String topic = split[2];
            if (keyMap.containsKey(topic + SPLIT_KEY + tableName)) {
                keyList = keyMap.get(topic + SPLIT_KEY + tableName);
                keyList.add(key);
            } else {
                keyList = new ArrayList<>();
                keyList.add(key);
            }
            // 每一个表对应的多条数据的标识，订单表
            // order_topic+order_info，list<订单编号, 订单编号, 订单编号>
            keyMap.put(topic + SPLIT_KEY + tableName, keyList);
        }
        return keyMap;
    }
    /**
     * 全量数据进行分组(实际上)
     * @return
     */
    private  Map<String, List<String>> groupAllTable(List<String> keyStrList){
        Map<String, List<String>> keyMap = new HashMap<>();
        //筛选按表为维度的集合
        for (String keyStr : keyStrList) {
            String[] split = keyStr.split(SPLIT_KEY);
            List<String> keyList;
            String key = split[0];
            String tableName = split[1];
            if (keyMap.containsKey(tableName)) {
                keyList = keyMap.get(tableName);
                keyList.add(key);
            } else {
                keyList = new ArrayList<>();
                keyList.add(key);
            }
            keyMap.put(tableName, keyList);
        }
        return keyMap;
    }
    /**
     * 模型转换
     *
     * @param scrollList 滚动List
     * @param tableName  表名
     */
    private void transformModel(List<Map<String, Object>> scrollList, String tableName) {
        String key = MergeConfig.getSingleKey(tableName);
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
        try {
            for (Map<String, Object> scrollMap : scrollList) {
                BinLog binLog = new BinLog();
                binLog.setOperateType(BinlogType.INSERT.getValue());
                binLog.setDataMap(scrollMap);
                binLog.setOperateTime(format.parse(scrollMap.get(TIME_STAMP) + "").getTime());
                binLog.setTableName(tableName);
                binLog.setKey(key);
                binlogDataMap.put(scrollMap.get(key) + SPLIT_KEY + tableName, binLog);
            }
        } catch (Exception e) {
            log.error("模型转换出错", e);
        }
    }


    /**
     * 转换成单条存储的sql变更对象
     *
     * @param binlogData MySQL的binlog对象
     * @param dataMap    单条sql的信息
     * @return binlog对象
     */
    private BinLog buildBinLog(BinlogData binlogData, Map<String, Object> dataMap) {
        BinLog binLog = new BinLog();
        binLog.setDataMap(dataMap);
        binLog.setOperateTime(binlogData.getOperateTime());
        binLog.setOperateType(binlogData.getOperateType());
        binLog.setTableName(binlogData.getTableName());
        binLog.setTopic(binlogData.getConsumeRecord().getTopic());
        return binLog;
    }

    /**
     * 计算切分次数
     */
    private static Integer countStep(Integer size) {
        return (size + MAX_SEND - 1) / MAX_SEND;
    }


}
